Real-time aggregate of multiple connectors¶

1- Import the data¶

This ran locally for about 2 days on a couple of pairs and multiple exchanges. The API used is CCXT watch_ticker. As an improvement, watch_trades would be more appropriate and would also track the volume.

In [2]:
import pandas as pd

data = pd.read_parquet('conector_latency_and_true_price.parquet')

2- Data overview (5 minutes downsample)¶

In [3]:
import datetime
import plotly.express as px

downsampled = data.set_index('timestamp', drop=False).groupby(['connector', 'symbol']).resample("5T").last()

for symbol in data.symbol.unique():
    symbol_data = downsampled[downsampled.symbol == symbol]
    display(px.line(symbol_data, x='timestamp', y='price', color='connector', title=symbol))

Analysis of CRV/USDT¶

In [4]:
SYMBOL = 'CRV/USDT'
from_index = datetime.datetime(2023, 8, 2, 15, 0)
to_index = datetime.datetime(2023, 8, 2, 16, 0)

data_symbol = data.loc[(data.symbol == SYMBOL) & (data.timestamp > from_index) & (data.timestamp < to_index)]

3- Aggregate algorithm¶

The algorithm uses the spread and frequency of ticker changes by connector to estimate the weight each connector should have. It then uses a weighted average of the last prices to output the true price.

In [5]:
import numpy as np

weight_by_connector = {}
for connector in data_symbol.connector.unique():
    connector_prices = data_symbol[data_symbol.connector == connector].copy()
    connector_prices["price_diff"] = connector_prices["price"].diff()
    connector_prices = connector_prices.loc[connector_prices.price_diff != 0]
    diff = connector_prices["price_diff"]
    weight_by_connector[connector] = 1.0 / diff.apply(float).var()

# We use the server-provided timestamp for the real prices, but the update_timestamp
# (websocket latency + database insertion latency) to compute the true price.
# The true price therefore has a realistic latency that depends on the connector.
real_prices_by_connector = pd.pivot_table(
    data=data_symbol,
    index="timestamp",
    columns="connector",
    values="price"
).ffill()

latency_prices_by_connector = pd.pivot_table(
    data=data_symbol,
    index="update_timestamp",
    columns="connector",
    values="price"
).ffill()


def row_avg(row):
    row = row.dropna()
    weights = pd.Series(
        [weight_by_connector[connector] for connector in row.index], index=row.index
    )
    valid_weights = weights[~weights.isnull()]
    # Remove nan weights
    row = row[~weights.isnull()]
    return np.average(row.to_numpy(), weights=valid_weights)


latency_true_price = latency_prices_by_connector.apply(row_avg, axis=1)
In [6]:
latency_true_price_fig = px.line(latency_true_price, x=latency_true_price.index, y=latency_true_price, title=SYMBOL)
connectors_fig = px.line(real_prices_by_connector, x=real_prices_by_connector.index, y=real_prices_by_connector.columns)

def hex_string_to_rgba(hex_string: str, alpha: str):
    hex_string = hex_string.lstrip('#')
    rgb = tuple(int(hex_string[i:i+2], 16) for i in (0, 2, 4))
    color_rgb_str = ', '.join([str(x) for x in rgb])
    return f'rgba({color_rgb_str}, {alpha})'

for trace in connectors_fig.data:
    color = trace.line.color
    trace.line.color = hex_string_to_rgba(color, '0.2')
    latency_true_price_fig.add_trace(trace)

display(latency_true_price_fig)